%load_ext autoreload
%autoreload 2
import numpy as np
import scipy.io as sio
from scipy import sparse
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from ripser import ripser
from persim import plot_diagrams, wasserstein
from sklearn.manifold import MDS
from sklearn.decomposition import PCA
from gtda.diagrams import PairwiseDistance # For Bottleneck / Wasserstein
from dreimac import CircularCoords
import os
import warnings
warnings.filterwarnings('ignore')
Load in full resolution data and subsample by a factor of 10
def plot_data(locs, X):
plt.figure(figsize=(10, 5))
ylim = [np.min(X), np.max(X)]
rg = ylim[1] - ylim[0]
ylim = [ylim[0] - 0.1*rg, ylim[1] + 0.1*rg]
for i in range(X.shape[0]):
plt.clf()
plt.plot(locs[i, :], X[i, :])
plt.ylim(ylim)
plt.xlim([0, 1])
plt.savefig("KSFrames/{}.png".format(i), bbox_inches='tight')
def get_random_samples(X, n_samples):
"""
Sample every time series with samples chosen uniformly at
random and sorted
Parameters
----------
X: ndarray(N, T)
N time series, each of length T
n_samples: int
Number of samples to take in each time series
Returns
-------
locs: ndarray(N, n_samples)
Sample locations for each time series on the interval [0, 1]
Y: ndarray(N, n_samples)
Samples
"""
Y = np.zeros((X.shape[0], n_samples))
locs = np.random.rand(X.shape[0], n_samples)
locs = np.sort(locs, axis=1)
idx = np.linspace(0, 1, X.shape[1])
for i in range(X.shape[0]):
Y[i, :] = np.interp(locs[i, :], idx, X[i, :])
return locs, Y
np.random.seed(0)
res = sio.loadmat("KS_1_2_0.002.mat")
locs, X = get_random_samples(res['data'], 100)
plot_data(locs, X)
print(X.shape)
def sliding_csm(D, win):
"""
Perform the effect of a sliding window on an CSM by averaging
along diagonals
Parameters
----------
D: ndarray(M, N)
A cross-similarity matrix
win: int
Window length
"""
M = D.shape[0] - win + 1
N = D.shape[1] - win + 1
S = np.zeros((M, N))
J, I = np.meshgrid(np.arange(N), np.arange(M))
for i in range(-M+1, N):
x = np.diag(D, i)
x = np.array([0] + x.tolist())
x = np.cumsum(x)
x = x[win::] - x[0:-win]
S[np.diag(I, i), np.diag(J, i)] = x
return S
def do_sublevelset_filtration(x):
"""
Do a sublevelset filtration of a time series using an interval graph
Parameters
----------
x: ndarray(N)
The time series
Returns
-------
dgm: 0D persistence diagram, excluding the essential class
"""
N = x.size
# Add edges between adjacent points in the time series, with the "distance"
# along the edge equal to the max value of the points it connects
I = np.arange(N-1)
J = np.arange(1, N)
I, J = np.concatenate((I, J)), np.concatenate((J, I))
V = np.maximum(x[I], x[J])
# Add vertex birth times along the diagonal of the distance matrix
I = np.concatenate((I, np.arange(N)))
J = np.concatenate((J, np.arange(N)))
V = np.concatenate((V, x))
#Create the sparse distance matrix
D = sparse.coo_matrix((V, (I, J)), shape=(N, N)).tocsr()
dgm = ripser(D, distance_matrix=True, maxdim=0)['dgms'][0]
return dgm[np.isfinite(dgm[:, 1]), :]
def do_sublevelset_filtration_circular(x):
"""
Do a sublevelset filtration of a time series, assuming it's on
a periodic domain
Parameters
----------
x: ndarray(N)
The time series
Returns
-------
dgm: 0D persistence diagram, excluding the essential class
"""
N = x.size
# Add edges between adjacent points in the time series
I = np.concatenate((np.arange(N), np.arange(N)))
J = np.concatenate(((np.arange(N)-1)%N, (np.arange(N)+1)%N))
V = np.maximum(x[I], x[J])
# Add vertex birth times along the diagonal of the distance matrix
I = np.concatenate((I, np.arange(N)))
J = np.concatenate((J, np.arange(N)))
V = np.concatenate((V, x))
#Create the sparse distance matrix
D = sparse.coo_matrix((V, (I, J)), shape=(N, N)).tocsr()
dgm = ripser(D, distance_matrix=True, maxdim=0)['dgms'][0]
return dgm[np.isfinite(dgm[:, 1]), :]
x = X[70, :]
loc = locs[70, :]
dgm1 = do_sublevelset_filtration(x)
dgm2 = do_sublevelset_filtration_circular(x)
plt.figure(figsize=(12, 4))
plt.subplot(121)
plt.plot(loc, x)
plt.ylim([-9, 9])
plt.subplot(122)
plot_diagrams([dgm1, dgm2], labels=['interval', 'circular'])
plt.ylim([-9, 9])
print("Interval")
print(dgm1)
print("Circular")
print(dgm2)
First, compute all sublevelset filtrations using both the circular and interval methods
def get_dgms(X, sublevelset_fn):
"""
Compute the 0D sublevelset filtrations for all of the time series in X
Parameters
----------
X: ndarray(N, T)
N time series of length T
sublevelset_fn: x->dgm
A function for computing a sublevelset filtration of a time series
Returns
-------
ndarray(N, max_persistence_points, 3)
N persistence diagrams, in Giotto-TDA format
"""
dgms_list = []
max_len = 0
for i in range(X.shape[0]):
dgm = sublevelset_fn(X[i, :])
dgms_list.append(dgm)
max_len = max(max_len, dgm.shape[0])
dgms = np.zeros((X.shape[0], max_len, 3))
for i, dgm in enumerate(dgms_list):
dgms[i, 0:dgm.shape[0], 0:2] = dgm
return dgms
dgms_interval = get_dgms(X, do_sublevelset_filtration)
dgms_circular = get_dgms(X, do_sublevelset_filtration_circular)
Now, compute all pairwise Bottleneck And Wasserstein distances
plt.imshow(X)
plt.figure(figsize=(12, 12))
k = 1
Ds = {}
for metric in ['bottleneck', 'wasserstein']:
PD = PairwiseDistance(metric=metric, metric_params={'delta': 1e-5}, order=None)
for dgms, dgms_title in zip((dgms_interval, dgms_circular), ['Interval', 'Circle']):
key = "{}_{}".format(dgms_title, metric)
print(key)
D = PD.fit_transform(dgms)
plt.subplot(2, 2, k)
plt.imshow(D)
plt.title(key)
Ds[key] = np.reshape(D, (D.shape[0], D.shape[1]))
k += 1
plt.figure(figsize=(12, 12))
k = 1
for metric in ['bottleneck', 'wasserstein']:
for dgms_title in ['Interval', 'Circle']:
key = "{}_{}".format(dgms_title, metric)
D = Ds[key]
print(D.shape)
S = sliding_csm(D, 50)
plt.subplot(2, 2, k)
plt.imshow(S)
plt.title(key)
k += 1
Finally, perform circular coordinates
S = sliding_csm(Ds['Circle_wasserstein'], 50)
cc = CircularCoords(S, 100, distance_matrix=True)
thetas = cc.get_coordinates(perc=0.9, cocycle_idx=[0])
plt.figure(figsize=(10, 4))
plt.subplot(121)
plot_diagrams(cc.dgms_)
plt.subplot(122)
plt.plot(thetas)
Now let's pull out groups of time series that lie in a similar range
plt.figure(figsize=(10, 5))
ylim = [np.min(X), np.max(X)]
rg = ylim[1] - ylim[0]
ylim = [ylim[0] - 0.1*rg, ylim[1] + 0.1*rg]
n_intervals = 20
dtheta = np.pi/n_intervals
for interval, theta in enumerate(np.linspace(0, 2*np.pi, n_intervals+1)[0:n_intervals]):
fdir = "KSFibers/{}".format(interval)
if not os.path.exists(fdir):
os.mkdir(fdir)
idxs = np.arange(thetas.size)
discrep = np.abs(thetas-theta)
discrep[discrep > np.pi] = 2*np.pi - discrep[discrep > np.pi]
idxs = idxs[discrep < dtheta]
for i, idx in enumerate(idxs):
plt.clf()
plt.subplot(121)
plt.plot(np.arange(thetas.size), thetas)
plt.scatter(idxs, thetas[idxs])
plt.scatter([idx], [thetas[idx]])
plt.title("theta = {:3f}".format(thetas[idx]))
plt.subplot(122)
plt.plot(X[idx, :])
plt.ylim(ylim)
plt.savefig("{}/{}.png".format(fdir, i))